package storage

import (
	"bytes"
	bolt "gitee.com/wanttobeamaster/bbolt"
	"gitee.com/wanttobeamaster/gridbase/pkg/util"
)

const (
	LHeadDirection        uint8  = 0
	LTailDirection        uint8  = 1
	LItemMinIndex         uint64 = 1024
	LItemMaxIndex         uint64 = 1<<64 - 1024
	LItemInitIndex        uint64 = 1<<32 - 512
	LimitConcurrencyWrite        = 50000
)

type ListObj struct {
	Head uint64
	Tail uint64
	Size uint64
}

// MarshalListObj Serialize ListObj func
func MarshalListObj(obj *ListObj) []byte {
	totalLen := 8 + 8 + 8
	raw := make([]byte, totalLen)

	idx := 0
	_ = util.Uint64ToBytes1(raw[idx:], obj.Head)

	idx += 8
	_ = util.Uint64ToBytes1(raw[idx:], obj.Tail)

	idx += 8
	_ = util.Uint64ToBytes1(raw[idx:], obj.Size)

	return raw
}

// UnmarshalListObj Deserialize ListObj func
func UnmarshalListObj(raw []byte) (*ListObj, error) {
	if len(raw) != 24 {
		return nil, nil
	}
	obj := ListObj{}

	idx := 0
	obj.Head, _ = util.BytesToUint64(raw[idx:])

	idx += 8
	obj.Tail, _ = util.BytesToUint64(raw[idx:])

	idx += 8
	obj.Size, _ = util.BytesToUint64(raw[idx:])

	return &obj, nil
}

type boltListEngine struct {
	db      *bolt.DB
	limiter util.Limiter
}

func newBoltListEngine(db *bolt.DB) ListEngine {
	db.Update(func(tx *bolt.Tx) error {
		_, err := tx.CreateBucketIfNotExists([]byte("boltListBucket"))
		if err != nil {
			return err
		}
		return nil
	})
	return &boltListEngine{
		db:      db,
		limiter: *util.NewLimiter(LimitConcurrencyWrite),
	}
}

// ListMetaObj generate *ListObj with key
func (e *boltListEngine) ListMetaObj(key []byte) (*ListObj, bool, error) {
	var (
		v   []byte
		err error
	)
	metaKey := RawKeyPrefix(key)
	err = e.db.View(func(tx *bolt.Tx) error {
		b := tx.Bucket([]byte("boltListBucket"))
		v = b.Get(metaKey)
		return nil
	})
	if err != nil {
		return nil, false, err
	}
	if v == nil {
		return nil, false, nil
	}
	obj, err := UnmarshalListObj(v)
	if err != nil {
		return nil, false, nil
	}
	return obj, false, nil
}

func(e *boltListEngine) newListMetaObj() *ListObj {
	return &ListObj{
		Head: LItemInitIndex,
		Tail: LItemInitIndex,
		Size: 0,
	}
}

func (e *boltListEngine) RawListKey(key []byte, idx uint64) []byte {
	keyPrefix := RawKeyPrefix(key)
	listKey := append(keyPrefix, util.DataTypeKey)
	idxBytes, _ := util.Uint64ToBytes(idx)
	listKey = append(listKey, idxBytes...)

	return listKey
}

func (e *boltListEngine) LIndex(key []byte, index int64) ([]byte, error) {
	if len(key) == 0 {
		return nil, ErrEmptyKey
	}

	var item []byte

	metaObj, _, err := e.ListMetaObj(key)
	if err != nil {
		return nil, err
	}
	if metaObj == nil {
		return nil, nil
	}

	//check whether index in [head, tail] of list
	//index can be negative!
	if index >= 0 {
		if index >= int64(metaObj.Size) {
			return nil, nil
		}
	} else {
		if -index > int64(metaObj.Size) {
			return nil, nil
		}
		index = index + int64(metaObj.Size)
	}

	e.db.View(func(tx *bolt.Tx) error {
		dataKey := e.RawListKey(key, metaObj.Head+uint64(index))

		b := tx.Bucket([]byte("boltListBucket"))
		item = b.Get(dataKey)
		return nil
	})

	return item, nil
}

//LInsert insert an item before pivot
func (e *boltListEngine) LInsert(key []byte, pos int, pivot []byte, value []byte) (int64, error) {
	if len(key) == 0 {
		return 0, ErrEmptyKey
	}
	eMetaKey := RawKeyPrefix(key)
	metaObj, _, err := e.ListMetaObj(key)
	if err != nil {
		return 0, err
	}
	if metaObj == nil {
		return 0, nil
	}

	//find pivot position
	var i uint64
	var findPivot = false
	for i = 0; i < metaObj.Size && !findPivot; i++ {
		eDataKey := e.RawListKey(key, metaObj.Head+i)
		var item []byte
		e.db.View(func(tx *bolt.Tx) error {
			b := tx.Bucket([]byte("boltListBucket"))
			item = b.Get(eDataKey)
			if bytes.Compare(item, pivot) == 0 {
				findPivot = true
			}
			return nil
		})
	}

	//pivot not find
	if i >= metaObj.Size {
		return int64(metaObj.Size), nil
	}

	for j := metaObj.Size; j > i; j-- {
		//get DataKey needed move back for one position
		//move it to [current index + 1]
		fromDataKey := e.RawListKey(key, metaObj.Head+j-1)
		toDataKey := e.RawListKey(key, metaObj.Head+j)
		e.db.Update(func(tx *bolt.Tx) error {
			b := tx.Bucket([]byte("boltListBucket"))
			item := b.Get(fromDataKey)
			err := b.Put(toDataKey, item)
			return err
		})
	}

	//insert value into index : i
	eDataKey := e.RawListKey(key, metaObj.Head+i)
	e.db.Update(func(tx *bolt.Tx) error {
		b := tx.Bucket([]byte("boltListBucket"))
		err := b.Put(eDataKey, value)
		return err
	})

	//update metaKey
	metaObj.Tail++
	metaObj.Size++
	v := MarshalListObj(metaObj)
	e.db.Update(func(tx *bolt.Tx) error {
		b := tx.Bucket([]byte("boltListBucket"))
		err := b.Put(eMetaKey, v)
		return err
	})
	return int64(metaObj.Size), nil
}

func (e *boltListEngine) LLen(key []byte) (int64, error) {
	if len(key) == 0 {
		return 0, ErrEmptyKey
	}

	metaObj, _, err := e.ListMetaObj(key)
	if err != nil {
		return 0, err
	}
	if metaObj == nil {
		return 0, nil
	}

	return int64(metaObj.Size), nil
}

// head <----------------> tail
//
func (e *boltListEngine) LPop(key []byte) ([]byte, error) {
	return e.lPopWithDirec(key, LHeadDirection)
}

func (e *boltListEngine) lPopWithDirec(key []byte, direc uint8) ([]byte, error) {
	if len(key) == 0 {
		return nil, ErrEmptyKey
	}
	//lpop item, used as result
	// but in e.db.Update,this address be relocate, so return error
	var item []byte

	eMataKey := RawKeyPrefix(key)
	metaObj, _, err := e.ListMetaObj(key)
	if err != nil {
		return nil, err
	}
	if metaObj == nil {
		return nil, nil
	}

	UpdateErr := e.db.Update(func(tx *bolt.Tx) error {
		var eDataKey []byte

		b := tx.Bucket([]byte("boltListBucket"))
		if direc == LHeadDirection {
			eDataKey = e.RawListKey(key, metaObj.Head)
			metaObj.Head++
		} else {
			metaObj.Tail--
			eDataKey = e.RawListKey(key, metaObj.Tail)
		}
		metaObj.Size--

		if metaObj.Size == 0 {
			//only one item left, delete meta
			err = b.Delete(eMataKey)
			if err != nil {
				return err
			}
		} else {
			v := MarshalListObj(metaObj)
			err = b.Put(eMataKey, v)
			if err != nil {
				return err
			}
		}

		//get item value
		item = b.Get(eDataKey)

		//delete item
		err = b.Delete(eDataKey)
		if err != nil {
			return err
		}
		return nil
	})
	// add append
	return append([]byte{} , item...), UpdateErr
}

func (e *boltListEngine) LPush(key []byte, values ...[]byte) (int64, error) {
	return e.lPushWithDirec(LHeadDirection, key, values...)
}

func (e *boltListEngine) lPushWithDirec(direc uint8, key []byte, values ...[]byte) (int64, error) {
	if len(key) == 0 {
		return 0, ErrEmptyKey
	}

	eMetaKey := RawKeyPrefix(key)
	metaObj, _, err := e.ListMetaObj(key)
	if err != nil {
		return 0, err
	}
	if metaObj == nil {
		metaObj = e.newListMetaObj()
	}

	e.db.Update(func(tx *bolt.Tx) error {
		b := tx.Bucket([]byte("boltListBucket"))

		var index uint64

		itemCnt := uint64(len(values))
		if direc == LHeadDirection {
			index = metaObj.Head
			metaObj.Head = metaObj.Head - itemCnt
		} else {
			index = metaObj.Tail
			metaObj.Tail = metaObj.Tail + itemCnt
		}
		metaObj.Size = metaObj.Size + itemCnt

		// encode meta value to bytes
		v := MarshalListObj(metaObj)

		//update meta, put item
		err = b.Put(eMetaKey, v)
		if err != nil {
			return err
		}

		var eDataKey []byte
		for _, value := range values {
			//generate item key
			if direc == LHeadDirection {
				index--
				eDataKey = e.RawListKey(key, index)
			} else {
				eDataKey = e.RawListKey(key, index)
				index++
			}
			err = b.Put(eDataKey, value)
			if err != nil {
				return err
			}
		}
		return nil
	})
	return int64(metaObj.Size), nil
}

func (e *boltListEngine) LPushX(key []byte, value []byte) (int64, error) {
	return e.lPushWithDirec(LHeadDirection, key, value)
}

func (e *boltListEngine) LRange(key []byte, begin int64, end int64) ([][]byte, error) {
	if len(key) == 0 {
		return nil, ErrEmptyKey
	}

	if begin > end && (end > 0 || begin < 0) {
		return nil, nil
	}

	var (
		retSlice [][]byte
		err      error
	)

	metaObj, _, err := e.ListMetaObj(key)
	if err != nil {
		return nil, err
	}
	if metaObj == nil {
		return nil, nil
	}

	if begin < 0 {
		if begin < -int64(metaObj.Size) {
			//set begin be first item index
			begin = 0
		} else {
			begin = begin + int64(metaObj.Size)
		}
	} else {
		if begin >= int64(metaObj.Size) {
			return nil, nil
		}
	}

	if end < 0 {
		if end >= int64(metaObj.Size) {
			//set stop be last item index
			end = int64(metaObj.Size) - 1
		}
	}

	if begin > end {
		return nil, nil
	}

	keys := make([][]byte, end-begin+1)
	retSlice = make([][]byte, end-begin+1)
	for i := range keys {
		keys[i] = e.RawListKey(key, metaObj.Head+uint64(begin)+uint64(i))
	}
	//use Read-only transaction of boltDB, may from Snapshot
	e.db.View(func(tx *bolt.Tx) error {
		b := tx.Bucket([]byte("boltListBucket"))
		for i, DataKey := range keys {
			retSlice[i] = b.Get(DataKey)
		}
		return nil
	})
	return retSlice, nil
}

func (e *boltListEngine) LRem(key []byte, count int64, value []byte) (int64, error) {
	if len(key) == 0 {
		return 0, ErrEmptyKey
	}
	eMetaKey := RawKeyPrefix(key)
	metaObj, _, err := e.ListMetaObj(key)
	if err != nil {
		return 0, err
	}
	if metaObj == nil {
		return 0, nil
	}

	//collect all nodes in current list key
	//delete all dataKeys
	nodesCollector := make([][]byte, metaObj.Size)
	e.db.View(func(tx *bolt.Tx) error {
		b := tx.Bucket([]byte("boltListBucket"))
		for i := range nodesCollector {
			eDataKey := e.RawListKey(key, metaObj.Head+uint64(i))
			nodesCollector[i] = b.Get(eDataKey)
			b.Delete(eDataKey)
		}
		return nil
	})

	//delete metaKey
	e.db.Batch(func(tx *bolt.Tx) error {
		b := tx.Bucket([]byte("boltListBucket"))
		err := b.Delete(eMetaKey)
		return err
	})

	//delete item as command
	var deleteCount int64
	retSlice := [][]byte{}

	if count > 0 {
		for _, node := range nodesCollector {
			if deleteCount == count || bytes.Compare(node, value) != 0 {
				retSlice = append(retSlice, node)
			} else {
				deleteCount++
			}
		}
	} else if count < 0 {
		count = - count
		for i := len(nodesCollector) - 1; i >= 0; i-- {
			node := nodesCollector[i]
			if deleteCount == count || bytes.Compare(node, value) != 0 {
				retSlice = append(retSlice, node)
			} else {
				deleteCount++
			}
		}
		//reverse retSlice
		util.ReverseByteMetrics(retSlice)
	} else if count == 0 {
		for _, node := range nodesCollector {
			if bytes.Compare(node, value) != 0 {
				retSlice = append(retSlice, node)
			}
		}
	}

	//recover the list into bolt
	metaObj = e.newListMetaObj()
	metaObj.Tail = metaObj.Tail + uint64(len(retSlice)) - 1
	metaObj.Size = uint64(len(retSlice))
	v := MarshalListObj(metaObj)
	e.db.Update(func(tx *bolt.Tx) error {
		b := tx.Bucket([]byte("boltListBucket"))
		err := b.Put(eMetaKey, v)
		return err
	})

	e.db.Batch(func(tx *bolt.Tx) error {
		b := tx.Bucket([]byte("boltListBucket"))
		for i, node := range retSlice {
			eDataKey := e.RawListKey(key, metaObj.Head+uint64(i))
			err := b.Put(eDataKey, node)
			if err != nil {
				return err
			}
		}
		return nil
	})
	return int64(metaObj.Size), nil
}

func (e *boltListEngine) LSet(key []byte, index int64, value []byte) error {
	if len(key) == 0 {
		return ErrEmptyKey
	}

	metaObj, _, err := e.ListMetaObj(key)
	if err != nil {
		return err
	}
	if metaObj == nil {
		metaObj = e.newListMetaObj()
	}

	setErr := e.db.Update(func(tx *bolt.Tx) error {
		b := tx.Bucket([]byte("boltListBucket"))
		eDataKey := e.RawListKey(key, metaObj.Head+uint64(index))
		err := b.Put(eDataKey, value)
		return err
	})
	return setErr
}

func (e *boltListEngine) LTrim(key []byte, begin int64, end int64) error {
	if len(key) == 0 {
		return ErrEmptyKey
	}

	eMetaKey := RawKeyPrefix(key)
	metaObj, _, err := e.ListMetaObj(key)
	if err != nil {
		return err
	}
	if metaObj == nil {
		metaObj = e.newListMetaObj()
	}

	updateErr := e.db.Update(func(tx *bolt.Tx) error {
		b := tx.Bucket([]byte("boltListBucket"))

		var delKey bool

		if begin < 0 {
			if begin < -int64(metaObj.Size) {
				begin = 0
			} else {
				begin = begin + int64(metaObj.Size)
			}
		} else {
			if begin >= int64(metaObj.Size) {
				delKey = true
			}
		}

		if end < 0 {
			if end < -int64(metaObj.Size) {
				end = 0
			} else {
				end = end + int64(metaObj.Size)
			}
		} else {
			if end >= int64(metaObj.Size) {
				end = int64(metaObj.Size) - 1
			}
		}

		if begin > end {
			delKey = true
		}

		if delKey {
			// delete meta key and all items
			err = b.Delete(eMetaKey)
			if err != nil {
				return err
			}

			for i := begin; i < end; i++ {
				eDataKey := e.RawListKey(key, metaObj.Head+uint64(i))
				err = b.Delete(eDataKey)
				if err != nil {
					return err
				}
			}
		} else {
			head := metaObj.Head
			size := metaObj.Size

			metaObj.Head = metaObj.Head + uint64(begin)
			metaObj.Tail = metaObj.Head + uint64(end) + 1
			metaObj.Size = metaObj.Tail - metaObj.Head

			metaValue := MarshalListObj(metaObj)

			//update meta
			err = b.Put(eMetaKey, metaValue)
			if err != nil {
				return err
			}

			var i int64
			//delete front items
			for i = 0; i < begin; i++ {
				eDataKey := e.RawListKey(key, head+uint64(i))
				err = b.Delete(eDataKey)
				if err != nil {
					return err
				}
			}

			//delete backend items
			for i = begin + 1; i < int64(size)-1; i++ {
				eDataKey := e.RawListKey(key, head+uint64(i))
				err = b.Delete(eDataKey)
				if err != nil {
					return err
				}
			}
		}
		return nil
	})
	return updateErr
}

func (e *boltListEngine) RPop(key []byte) ([]byte, error) {
	return e.lPopWithDirec(key, LTailDirection)
}

func (e *boltListEngine) RPush(key []byte, values ...[]byte) (int64, error) {
	return e.lPushWithDirec(LTailDirection, key, values...)
}

func (e boltListEngine) RPushX(key []byte, value []byte) (int64, error) {
	return e.lPushWithDirec(LTailDirection, key, value)
}
